package hamster.csustef.acquisition.flink.transfer;

import hamster.csustef.acquisition.api.WeatherApi;
import hamster.csustef.acquisition.constant.AcquisitionConstant;
import hamster.csustef.common.util.DateUtil;
import org.apache.flink.api.common.functions.MapFunction;

public class GetWeatherMapFunction implements MapFunction<String, String> {
    private String seperator = ",";

    @Override
    public String map(String message) throws Exception {
        if (WeatherApi.getWeather(AcquisitionConstant.KAFKA_SERVERS, AcquisitionConstant.CS_WEATHER_TOPIC, message))
            return "SUCCESS" + seperator + DateUtil.getNowDateTime();
        else
            return "ERROR" + seperator + DateUtil.getNowDateTime();
    }
}
